1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package storm.starter.bolt;
19  
20  import backtype.storm.Config;
21  import backtype.storm.topology.BasicOutputCollector;
22  import backtype.storm.topology.OutputFieldsDeclarer;
23  import backtype.storm.tuple.Fields;
24  import backtype.storm.tuple.Tuple;
25  import backtype.storm.tuple.Values;
26  import com.google.common.collect.Lists;
27  import org.testng.annotations.DataProvider;
28  import org.testng.annotations.Test;
29  import storm.starter.tools.MockTupleHelpers;
30  
31  import java.util.Map;
32  
33  import static org.fest.assertions.api.Assertions.assertThat;
34  import static org.mockito.Matchers.any;
35  import static org.mockito.Mockito.*;
36  
37  public class IntermediateRankingsBoltTest {
38  
39    private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
40    private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
41    private static final Object ANY_OBJECT = new Object();
42    private static final int ANY_TOPN = 10;
43    private static final long ANY_COUNT = 42;
44  
45    private Tuple mockRankableTuple(Object obj, long count) {
46      Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
47      when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
48      return tuple;
49    }
50  
51    @DataProvider
52    public Object[][] illegalTopN() {
53      return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
54    }
55  
56    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
57    public void negativeOrZeroTopNShouldThrowIAE(int topN) {
58      new IntermediateRankingsBolt(topN);
59    }
60  
61    @DataProvider
62    public Object[][] illegalEmitFrequency() {
63      return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
64    }
65  
66    @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
67    public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
68      new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
69    }
70  
71    @DataProvider
72    public Object[][] legalTopN() {
73      return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
74    }
75  
76    @Test(dataProvider = "legalTopN")
77    public void positiveTopNShouldBeOk(int topN) {
78      new IntermediateRankingsBolt(topN);
79    }
80  
81    @DataProvider
82    public Object[][] legalEmitFrequency() {
83      return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
84    }
85  
86    @Test(dataProvider = "legalEmitFrequency")
87    public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
88      new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
89    }
90  
91    @Test
92    public void shouldEmitSomethingIfTickTupleIsReceived() {
93      // given
94      Tuple tickTuple = MockTupleHelpers.mockTickTuple();
95      BasicOutputCollector collector = mock(BasicOutputCollector.class);
96      IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
97  
98      // when
99      bolt.execute(tickTuple, collector);
100 
101     // then
102     // verifyZeroInteractions(collector);
103     verify(collector).emit(any(Values.class));
104   }
105 
106   @Test
107   public void shouldEmitNothingIfNormalTupleIsReceived() {
108     // given
109     Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
110     BasicOutputCollector collector = mock(BasicOutputCollector.class);
111     IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
112 
113     // when
114     bolt.execute(normalTuple, collector);
115 
116     // then
117     verifyZeroInteractions(collector);
118   }
119 
120   @Test
121   public void shouldDeclareOutputFields() {
122     // given
123     OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
124     IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
125 
126     // when
127     bolt.declareOutputFields(declarer);
128 
129     // then
130     verify(declarer, times(1)).declare(any(Fields.class));
131   }
132 
133   @Test
134   public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
135     // given
136     IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
137 
138     // when
139     Map<String, Object> componentConfig = bolt.getComponentConfiguration();
140 
141     // then
142     assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
143     Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
144     assertThat(emitFrequencyInSeconds).isGreaterThan(0);
145   }
146 }